# 协调器

欢迎来到 `dora` 教程的最后一章！在本教程中，我们学习了`数据流`、`节点`和`操作符` ，以及它们如何通过事件流进行通信并高效地传输数`据消息/Arrow Data`。
我们还了解了如何使用 `API` 绑定编写逻辑，以及如何使用 `Dora CLI` 作为命令行界面。最近，在 [Dora 守护进程](./dora-daemon) 中，我们学习了 `Dora` 守护进程，它是在单台机器上本地管理 `dora` 组件的后台服务。

但是，当你的应用程序变得更大，需要使用分布在多台计算机上的传感器、处理器或硬件时，会发生什么情况呢？例如，你可能会遇到：

- 一个机器人上的一个摄像头。
- 在强大的台式计算机上运行的重型 AI 模型。
- 另一块硬件上的电机控制器。
- 笔记本电脑上的可视化显示。

您的 `Dataflow` 需要跨越所有这些机器。您将在每台机器上运行一个 `Dora Daemon` 来管理本地节点 。但是，如何让系统在正确的机器上启动正确的节点 ，协调它们之间通过网络的通信，并管理这个分布式应用程序的整个生命周期呢？

这就是 **`Dora Coordinator` 协调器** 的工作。

## 空中交通管制塔

可以将 `Dora 协调器` 想象成整个 `dora` 系统的中央空中交通管制塔 ，尤其是在数据流分布于多台机器的情况下。每个 `Dora` 守护进程都是其本地工厂车间的“领班”（管理一台机器上的节点），而协调器则是高级管理者，负责监督所有不同的工厂车间，并确保所有设备和谐地协同工作。

其主要职责是：

1. **接收命令：** 它从 `Dora CLI` 接收有关整个数据流的指令（例如“启动此数据流”、“停止该数据流”）。
2. **了解景观：** 它跟踪不同机器上可用的 `Dora Daemons` 。
3. **数据流编排：** 它读取数据流 `YAML` ，了解哪些节点需要在哪台机器上运行（如果指定），并告诉适当的 `Dora` 守护进程启动这些节点 。
4. **管理生命周期：** 它管理数据流的整体状态（运行、停止等）并协调跨多台机器停止或重新加载节点等命令。
5. **协调通信（高级）：** 守护进程处理本地数据传输（如共享内存 ）时，协调器确保底层传输（如 `Zenoh`，通常用于分布式设置）配置正确，以便数据可以按照数据流蓝图在不同机器上的守护进程之间流动。

本质上，协调器是连接用户（通过 `CLI`）和守护进程和节点的分布式集合的大脑。

## 将协调器与分布式数据流结合使用

当你使用 `dora run yolo.yml` 在一台机器上本地运行数据流时，`CLI` 可能会直接与本地 `Dora Daemon` 通信，或者它可能会自动启动并与本地 `Coordinator` 进程通信，然后由该进程管理本地 `Daemon`。对于在单机上运行的初学者来说，`Coordinator` 的存在可能一开始不太明显。

然而，当你定义一个跨多台机器的数据流时，协调器就变得至关重要，并且需要明确指定。你可以使用数据流 `YAML` 文件中的 `_unstable_deploy: machine` 字段来告诉 `dora` 每个节点应该在哪里运行。

以下是分布式数据流的简化示例 `YAML`：

```yaml
nodes:
  # 机器 A 上的摄像头节点（例如，机器人）
  - id: camera-robot
    path: camera-node-code # 代码在机器人上运行
    outputs:
      - image
    _unstable_deploy: # 告诉 Dora 将此节点部署到哪里
      machine: robot-machine-A # 需要在机器“robot-machine-A”上运行守护进程

  # 机器 B 上的对象检测节点（例如，一台功能强大的 PC）
  - id: object-detector-pc
    path: yolo-node-code # 代码在PC上运行
    inputs:
      image: camera-robot/image # 接收来自机器人A的图像
    outputs:
      - bbox
    _unstable_deploy:
      machine: desktop-machine-B # 需要在机器“desktop-machine-B”上运行守护进程

  # 在机器 C 上绘图的节点（例如笔记本电脑）
  - id: plot-laptop
    path: plot-node-code # 代码在笔记本电脑上运行
    inputs:
      image: camera-robot/image # 接收来自机器人A的 image
      boxes2d: object-detector-pc/bbox # 从桌面计算机 B 接收 bbox
    _unstable_deploy:
      machine: laptop-machine-C # 需要在机器“laptop-machine-C”上运行守护进程
```

在此设置中，您将拥有：

- 在 `robot-machine-A` 上运行的 `Dora Daemon` 。
- 在 `desktop-machine-B` 上运行的 `Dora Daemon` 。
- 在 `laptop-machine-C` 上运行的 `Dora Daemon` 。
- 至关重要的是，单个 `Dora Coordinator` 进程在所有这些机器可以访问的地方运行（它甚至可以在机器人、`PC` 或笔记本电脑之一上，或单独的服务器上）。

要运行这个分布式数据流，您可以使用 `Dora CLI` 并告诉它协调器的位置，通常使用 `--coordinator-addr` 标志：

```bash
# 假设协调器在 IP 地址 192.168.1.100 上运行
dora run distributed_dataflow.yml --coordinator-addr 192.168.1.100 
```

执行此命令时：

1. `Dora CLI` 程序启动。
2. 它连接到在 `192.168.1.100` 运行的 `Dora Coordinator` 进程。
3. 它将 `distributed_dataflow.yml` 内容发送给协调器。
4. 协调器读取 `YAML` 并查看哪些节点分配给哪些机器（ `robot-machine-A` 、 `desktop-machine-B` 、 `laptop-machine-C` ）。
5. 协调器识别与这些机器 `ID` 关联的 `Dora` 守护进程 （守护进程在启动时向协调器注册）。
6. 协调器向 `robot-machine-A` 上的守护进程发送指令以启动 `camera-robot` 节点。
7. 协调器向 `desktop-machine-B` 上的守护进程发送指令以启动 `object-detector-pc` 节点。
8. 协调器向 `laptop-machine-C` 上的守护进程发送指令以启动 `plot-laptop` 节点。
9. 协调器还确保正确设置底层分布式通信层（如 `Zenoh`），以便数据在承载连接节点的守护进程之间流动（ `camera-robot` 输出 -> `object-detector-pc` 输入、 `camera-robot` 输出 -> `plot-laptop` 输入、 `object-detector-pc` 输出 -> `plot-laptop` 输入）。
10. 协调器继续运行，监视守护进程和数据流的状态，并将状态或日志消息（如果需要）传回 `CLI`。

要停止此分布式数据流，您可以再次使用 `CLI`，告诉它与哪个协调器对话：

```bash
dora stop my_dataflow_id --coordinator-addr 192.168.1.100
```

`CLI` 告诉协调器停止数据流，然后协调器向相关守护进程发送停止命令，守护进程又向它们管理的节点发送 `STOP` 事件。

## 协调器是如何工作的

`Dora` 协调器作为独立、专用的后台进程运行。其主要目标是维护跨多台机器的 `dora` 系统的全局视图。

- 通信： 它通过 `TCP/IP` 与 `Dora` 守护进程通信。守护进程启动时会连接到协调器。
- 状态管理： 它保存正在运行的数据流的整体状态，包括哪些节点属于哪个数据流以及每个节点被分配到哪个守护进程/机器。
- 事件循环： 与其他 `dora` 组件一样，`Coordinator` 也有一个事件循环。它会等待并处理不同的事件：
- 新的守护进程连接。
- 来自 `Dora CLI` 的请求（例如， `Start` 、 `Stop` 、 `List` 、 `Build` 、 `Logs` 、 `Destroy` ）。
- 来自守护进程的消息（例如，“我的守护进程 ID 是......”，“数据流 X 已完成结果......”，“我的机器上的节点 Y 在订阅之前退出”）。
- 内部事件（如心跳定时器，用于检查守护进程是否仍然活跃）。
- 编排逻辑： 当它收到数据流的 `Start` 等命令时：
- 它解析 `YAML` 来了解节点部署。
- 它检查其连接的守护进程列表，以查看所需的机器是否可用。
- 然后，它将特定的 `DaemonCoordinatorEvent` 消息（如 `Spawn(SpawnDataflowNodes)` ）发送给相关的守护进程，告诉每个守护进程它负责为这个特定的数据流启动哪些节点。
- 它跟踪来自守护进程的回复（ `DaemonCoordinatorReply` ）以确认节点已成功生成或构建（ `DataflowSpawnResult` ， `DataflowBuildResult` ）。
- 一旦所有需要的节点在各自的守护进程上报告准备就绪，协调器就会向所有涉及的守护进程发送 `AllNodesReady` 事件，节点可以使用该事件（例如开始发送数据）。
- 当它收到 `Stop` 命令时，它会向所有涉及的守护进程发送 `StopDataflow` 事件。

下面是一个简化的序列图，说明如何启动分布式数据流：

![coordinator](/coordinator01.png)

这表明协调器充当着协调分布式守护进程动作的中心点。

您可以在 `dora-coordinator` 包 ( `binaries/coordinator/src/lib.rs` ) 中找到 `Dora Coordinator` 的核心实现。该文件包含 `start` 函数，该函数用于设置传入连接（来自守护进程和 CLI 控制平面）的监听器，以及处理不同事件类型（ `Event::NewDaemonConnection` 、 `Event::Control` 、 `Event::Daemon` 等）的主异步循环 ( `start_inner` )。

协调器与守护进程之间交换的消息（例如 `DaemonCoordinatorEvent` 和 `DaemonCoordinatorReply` ）定义在 `libraries/message/src/coordinator_to_daemon.rs` 和 `libraries/message/src/daemon_to_coordinator.rs` 中。`CLI` 命令定义在 `libraries/message/src/cli_to_coordinator.rs` 中。协调器的事件循环接收这些消息并触发相应的操作。

## 总结

`Dora Coordinator` 是 `dora` 系统的中央编排器和指挥中心，对于管理分布在多台机器上的数据流至关重要。它接收来自 `Dora CLI` 的指令，跟踪可用的 `Dora Daemons` ，并根据数据流 YAML 蓝图协调在正确的机器上启动、停止和管理节点 。通过监督分布式系统，它使您能够无缝构建和管理复杂的多机器应用程序。

至此，我们对 `dora` 核心概念的入门教程就结束了。我们介绍了`数据流`、`构建块（节点、操作符）`、`通信机制（事件流、数据消息/Arrow Data、API 绑定）`以及`运行时基础架构（命令行界面、守护进程、协调器）`。现在，您对 `dora` 工作原理及其主要组件的作用有了基本的了解，这将帮助您更深入地构建自己的实时数据流应用程序。

